Source code for dlk.data.processors.basic

# Copyright 2021 cstsunfu. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import hjson
import pandas as pd
from typing import Union, Dict
from dlk.data.processors import IProcessor, processor_config_register, processor_register
from dlk.data.subprocessors import subprocessor_config_register, subprocessor_register
from dlk.utils.config import BaseConfig
from dlk.utils.logger import Logger
from dlk.utils.io import open

logger = Logger.get_logger()


[docs]@processor_config_register('basic') class BasicProcessorConfig(BaseConfig): """Config for BasicProcessor Config Example: >>> { >>> // input should be {"train": train, "valid": valid, ...}, train/valid/test/predict/online etc, should be dataframe and must have a column named "sentence" >>> "_name": "basic@test_text_cls", >>> "config": { >>> "feed_order": ["load", "tokenizer", "token_gather", "label_to_id", "token_embedding", "save"] >>> }, >>> "subprocessor@load": { >>> "_base": "load", >>> "config":{ >>> "base_dir": "", >>> "predict":{ >>> "meta": "./meta.pkl", >>> }, >>> "online": [ >>> "predict", //base predict >>> { // special config, update predict, is this case, the config is null, means use all config from "predict", when this is empty dict, you can only set the value to a str "predict", they will get the same result >>> } >>> ] >>> } >>> }, >>> "subprocessor@save": { >>> "_base": "save", >>> "config":{ >>> "base_dir": "", >>> "train":{ >>> "processed": "processed_data.pkl", // all data >>> "meta": { >>> "meta.pkl": ['label_vocab'] //only for next time use >>> } >>> }, >>> "predict": { >>> "processed": "processed_data.pkl", >>> } >>> } >>> }, >>> "subprocessor@tokenizer":{ >>> "_base": "fast_tokenizer", >>> "config": { >>> "train": { >>> "config_path": "*@*", >>> "prefix": "" >>> "data_type": "single", // single or pair, if not provide, will calc by len(process_data) >>> "process_data": [ >>> ["sentence", { "is_pretokenized": false}], >>> ], >>> "post_processor": "default" >>> "filed_map": { // this is the default value, you can provide other name >>> "ids": "input_ids", >>> }, // the tokenizer output(the key) map to the value >>> }, >>> "predict": "train", >>> "online": "train" >>> } >>> }, >>> "subprocessor@token_gather":{ >>> "_base": "token_gather", >>> "config": { >>> "train": { // only train stage using >>> "data_set": { // for different stage, this processor will process different part of data >>> "train": ["train", "valid"] >>> }, >>> "gather_columns": ["label"], //List of columns. Every cell must be sigle token or list of tokens or set of tokens >>> "deliver": "label_vocab", // output Vocabulary object (the Vocabulary of labels) name. >>> } >>> } >>> }, >>> "subprocessor@label_to_id":{ >>> "_base": "token2id", >>> "config": { >>> "train":{ //train、predict、online stage config, using '&' split all stages >>> "data_pair": { >>> "label": "label_id" >>> }, >>> "data_set": { // for different stage, this processor will process different part of data >>> "train": ['train', 'valid', 'test'], >>> "predict": ['predict'], >>> "online": ['online'] >>> }, >>> "vocab": "label_vocab", // usually provided by the "token_gather" module >>> }, //3 >>> "predict": "train", >>> "online": "train", >>> } >>> }, >>> "subprocessor@token_embedding": { >>> "_base": "token_embedding", >>> "config":{ >>> "train": { // only train stage using >>> "embedding_file": "*@*", >>> "tokenizer": "*@*", //List of columns. Every cell must be sigle token or list of tokens or set of tokens >>> "deliver": "token_embedding", // output Vocabulary object (the Vocabulary of labels) name. >>> "embedding_size": 200, >>> } >>> } >>> }, >>> } """ def __init__(self, stage, config: Dict): super(BasicProcessorConfig, self).__init__(config) if isinstance(config, str): with open(config) as f: config = hjson.load(f, object_pairs_hook=dict) self.feed_order = config["config"]['feed_order'] self.subprocessors = config self.stage = stage
# self.post_check(config['config'], used=['feed_order'])
[docs]@processor_register('basic') class BasicProcessor(IProcessor): """Basic and General Processor""" def __init__(self, stage: str, config: BasicProcessorConfig): super(BasicProcessor, self).__init__() self._name = config._name self.stage = stage self.feed_order = config.feed_order assert len(self.feed_order) > 0 self.subprocessors = {} for name in self.feed_order: subprocessor_config_dict = config.subprocessors[ f'subprocessor@{name}'] logger.info(f"Init '{name}' ....") subprocessor_name = subprocessor_config_dict["_name"] subprocessor_config = subprocessor_config_register.get( subprocessor_name)(stage=self.stage, config=subprocessor_config_dict) subprocessor = subprocessor_register.get(subprocessor_name)( stage=self.stage, config=subprocessor_config) self.subprocessors[name] = subprocessor
[docs] def process(self, data: Dict) -> Dict: """Process entry Args: data: >>> { >>> "data": {"train": ...}, >>> "tokenizer": .. >>> } Returns: processed data """ logger.info(f"Start Data Processing....") for name in self.feed_order: if self.stage != 'online': logger.info(f"Processing on '{name}' ....") data = self.subprocessors[name].process(data) logger.info(f"Data Processed.") return data