//OpenSCADA module Archive.DBArch file: mess.cpp
/***************************************************************************
 *   Copyright (C) 2007-2023 by Roman Savochenko, <roman@oscada.org>       *
 *                                                                         *
 *   This program is free software; you can redistribute it and/or modify  *
 *   it under the terms of the GNU General Public License as published by  *
 *   the Free Software Foundation; version 2 of the License.               *
 *                                                                         *
 *   This program is distributed in the hope that it will be useful,       *
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
 *   GNU General Public License for more details.                          *
 *                                                                         *
 *   You should have received a copy of the GNU General Public License     *
 *   along with this program; if not, write to the                         *
 *   Free Software Foundation, Inc.,                                       *
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
 ***************************************************************************/

#include <sys/time.h>
#include <sys/stat.h>
#include <fcntl.h>

#include <tsys.h>
#include "arch.h"
#include "mess.h"

using namespace DBArch;

//************************************************
//* DBArch::ModMArch - Messages archivator       *
//************************************************
ModMArch::ModMArch( const string &iid, const string &idb, TElem *cf_el ) :
    TMArchivator(iid, idb, cf_el), tmProc(0), tmProcMax(0), mBeg(0), mEnd(0), mMaxSize(0),
    mTmAsStr(false), mKeyTmCat(true), needMeta(true)
{
    setAddr(DB_GEN);
}

ModMArch::~ModMArch( )
{
    try { stop(); } catch(...) { }
}

TCntrNode &ModMArch::operator=( const TCntrNode &node )
{
    const TMArchivator *src_n = dynamic_cast<const TMArchivator*>(&node);
    if(!src_n) return *this;

    //Configuration copy
    exclCopy(*src_n, "ID;START;");
    cfg("MODUL").setS(owner().modId());
    setDB(src_n->DB());

    //TMArchivator::operator=(node);
    load_();

    return *this;
}

void ModMArch::postDisable( int flag )
{
    TMArchivator::postDisable(flag);

    if(flag&NodeRemove) {
	//Remove info record
	TConfig cfg(&mod->archEl());
	cfg.cfg("TBL").setS(archTbl(),true);
	TBDS::dataDel(addr()+"."+mod->mainTbl(), "", cfg);

	//Removing the archive DB table
	TBDS::dataDelTbl(addr()+"."+archTbl());
    }
}

void ModMArch::load_( )
{
    //TMArchivator::load_();

    //Init address to DB
    if(addr().empty()) setAddr(DB_GEN);

    try {
	XMLNode prmNd;
	string  vl;
	prmNd.load(cfg("A_PRMS").getS());
	if(!(vl=prmNd.attr("Size")).empty())	setMaxSize(s2r(vl));
	if(!(vl=prmNd.attr("TmAsStr")).empty())	setTmAsStr(s2i(vl));
	if(!(vl=prmNd.attr("KeyTmCat")).empty()) setKeyTmCat(s2i(vl));
    } catch(...) { }

    needMeta = !readMeta();
}

void ModMArch::save_( )
{
    XMLNode prmNd("prms");
    prmNd.setAttr("Size", r2s(maxSize()));
    prmNd.setAttr("TmAsStr", i2s(tmAsStr()));
    prmNd.setAttr("KeyTmCat", i2s(keyTmCat()));
    cfg("A_PRMS").setS(prmNd.save(XMLNode::BrAllPast));

    TMArchivator::save_();
}

void ModMArch::start( )
{
    if(!runSt) {
	reqEl.fldClear();
	reqEl.fldAdd(new TFld("MIN",trS("In minutes"),TFld::Integer,TCfg::Key,"8"));	//Mostly for fast reading next, by minutes
	reqEl.fldAdd(new TFld("TM",trS("Time, seconds"),TFld::Integer,TCfg::Key|(tmAsStr()?TFld::DateTimeDec:0),(tmAsStr()?"20":"10")));
	reqEl.fldAdd(new TFld("TMU",trS("Time, microseconds"),TFld::Integer,TCfg::Key,"6","0"));
	reqEl.fldAdd(new TFld("CATEG",trS("Category"),TFld::String,TCfg::Key,"200"));
	reqEl.fldAdd(new TFld("MESS",trS("Message"),TFld::String,(keyTmCat()?(int)TFld::NoFlag:(int)TCfg::Key),(keyTmCat()?"100000":"255")));
	reqEl.fldAdd(new TFld("LEV",trS("Level"),TFld::Integer,TFld::NoFlag,"2"));
    }

    //Connection to DB and enable status check
    string wdb = TBDS::realDBName(addr());
    AutoHD<TBD> db = SYS->db().at().nodeAt(wdb, 0, '.');
    try { if(!db.at().enableStat()) db.at().enable(); }
    catch(TError &err) { mess_warning(nodePath().c_str(), _("Error enabling the target DB: %s"), err.mess.c_str()); }

    TMArchivator::start();
}

void ModMArch::stop( )
{
    TMArchivator::stop();

    reqEl.fldClear();
}

time_t ModMArch::begin( )
{
    return mBeg;
}

time_t ModMArch::end( )
{
    return mEnd;
}

bool ModMArch::put( vector<TMess::SRec> &mess, bool force )
{
    if(needMeta && (needMeta=!readMeta()))	return false;

    TMArchivator::put(mess, force);	//Allow redundancy

    if(!runSt) throw TError(nodePath(), _("The archive is not started!"));

    AutoHD<TTable> tbl = TBDS::tblOpen(addr()+"."+archTbl(), true);
    if(tbl.freeStat()) return false;

    TConfig cfg(&reqEl);
    int64_t t_cnt = TSYS::curTime();
    for(unsigned i_m = 0; i_m < mess.size(); i_m++) {
	if(!chkMessOK(mess[i_m].categ,mess[i_m].level)) continue;

	//Put record to DB
	cfg.cfg("MIN").setI(mess[i_m].time/60);
	cfg.cfg("TM").setI(mess[i_m].time);
	cfg.cfg("TMU").setI(mess[i_m].utime);
	cfg.cfg("CATEG").setS(mess[i_m].categ);
	cfg.cfg("MESS").setS(mess[i_m].mess);
	cfg.cfg("LEV").setI(mess[i_m].level);
	tbl.at().fieldSet(cfg);
	//Archive time border update
	mBeg = mBeg ? vmin(mBeg,mess[i_m].time) : mess[i_m].time;
	mEnd = mEnd ? vmax(mEnd,mess[i_m].time) : mess[i_m].time;
    }

    //Archive size limit process
    if(maxSize() && (mEnd-mBeg) > (time_t)(maxSize()*86400)) {
	time_t nEnd = mEnd - (time_t)(maxSize()*86400);
	cfg.cfg("TM").setKeyUse(false);
	for(int tC = mBeg/60; tC < nEnd/60; tC++) {
	    cfg.cfg("MIN").setI(tC, true);
	    tbl.at().fieldDel(cfg);
	}
	mBeg = nEnd;
    }
    tbl.free();
    //SYS->db().at().close(addr()+"."+archTbl());	//!!! No close the table manually

    //Update archive info
    cfg.setElem(&mod->archEl());
    cfg.cfgViewAll(false);
    cfg.cfg("TBL").setS(archTbl(),true);
    cfg.cfg("BEGIN").setS(i2s(mBeg),true);
    cfg.cfg("END").setS(i2s(mEnd),true);
    bool rez = TBDS::dataSet(addr()+"."+mod->mainTbl(), "", cfg, TBDS::NoException);

    tmProc = TSYS::curTime() - t_cnt; tmProcMax = vmax(tmProcMax, tmProc);

    return rez;
}

time_t ModMArch::get( time_t bTm, time_t eTm, vector<TMess::SRec> &mess, const string &category, char level, time_t upTo )
{
    if(!runSt) throw TError(nodePath(), _("The archive is not started!"));
    if(needMeta && (needMeta=!readMeta())) return eTm;
    if(!upTo) upTo = SYS->sysTm() + prmInterf_TM;

    bTm = vmax(bTm, begin());
    eTm = vmin(eTm, end());
    if(eTm < bTm) return eTm;

    TConfig cfg(&reqEl);
    TRegExp re(category, "p");

    //Get values from DB
    cfg.cfg("TM").setKeyUse(false);
    time_t result = bTm;
    for(time_t tC = bTm; tC/60 <= eTm/60 && SYS->sysTm() < upTo; ) {
	tC = (tC/60)*60;
	cfg.cfg("MIN").setI(tC/60, true);
	int eC = 0;
	for( ; TBDS::dataSeek(addr()+"."+archTbl(),"",eC++,cfg,TBDS::UseCache) && SYS->sysTm() < upTo; ) {
	    TMess::SRec rc(cfg.cfg("TM").getI(), cfg.cfg("TMU").getI(), cfg.cfg("CATEG").getS(),
			    (TMess::Type)cfg.cfg("LEV").getI(), cfg.cfg("MESS").getS());
	    if(rc.time >= bTm && rc.time <= eTm && TMess::messLevelTest(level,rc.level) && re.test(rc.categ)) {
		bool equal = false;
		int i_p = mess.size();
		for(int i_m = mess.size()-1; i_m >= 0; i_m--) {
		    if(FTM(mess[i_m]) > FTM(rc)) i_p = i_m;
		    else if(FTM(mess[i_m]) == FTM(rc) && rc.level == mess[i_m].level && rc.mess == mess[i_m].mess)
		    { equal = true; break; }
		    else if(FTM(mess[i_m]) < FTM(rc)) break;
		}
		if(!equal) {
		    mess.insert(mess.begin()+i_p, rc);
		    if(SYS->sysTm() >= upTo) return result;
		}
	    }
	}
	tC += 60;
	if(SYS->sysTm() < upTo) result = vmax(bTm, vmin(eTm,tC-1));
    }

    return result;
}

bool ModMArch::readMeta( )
{
    bool rez = true;

    //Load message archive parameters
    TConfig wcfg(&mod->archEl());
    wcfg.cfg("TBL").setS(archTbl());
    if(TBDS::dataGet(addr()+"."+mod->mainTbl(),"",wcfg,TBDS::NoException)) {
	mBeg = s2i(wcfg.cfg("BEGIN").getS());
	mEnd = s2i(wcfg.cfg("END").getS());
	// Check for delete archivator table
	if(maxSize() && mEnd <= (time(NULL)-(time_t)(maxSize()*86400))) {
	    TBDS::dataDelTbl(addr()+"."+archTbl());
	    mBeg = mEnd = 0;
	}
    } else rez = false;

    //Check for target DB enabled (disabled by the connection loss)
    if(!rez) {
	string wDB = TBDS::realDBName(addr());
	rez = (TSYS::strParse(wDB,0,".") == DB_CFG ||
	    SYS->db().at().at(TSYS::strParse(wDB,0,".")).at().at(TSYS::strParse(wDB,1,".")).at().enableStat());
    }

    return rez;
}

void ModMArch::cntrCmdProc( XMLNode *opt )
{
    //Get page info
    if(opt->name() == "info") {
	TMArchivator::cntrCmdProc(opt);
	ctrRemoveNode(opt,"/prm/cfg/A_PRMS");
	ctrMkNode("fld",opt,-1,"/prm/st/tarch",_("Archiving time"),R_R_R_,"root",SARH_ID,1,"tp","str");
	ctrMkNode("fld",opt,-1,"/prm/cfg/ADDR",EVAL_STR,startStat()?R_R_R_:RWRWR_,"root",SARH_ID,3,
	    "dest","select","select","/db/list:onlydb","help",TMess::labStor().c_str());
	if(ctrMkNode("area",opt,-1,"/prm/add",_("Additional options"),R_R_R_,"root",SARH_ID)) {
	    ctrMkNode("fld",opt,-1,"/prm/add/sz",_("Archive size, days"),RWRWR_,"root",SARH_ID,2,
		"tp","real", "help",_("Set to 0 to disable this limit and to rise some the performance."));
	    ctrMkNode("fld",opt,-1,"/prm/add/tmAsStr",_("To form time as a string"),startStat()?R_R_R_:RWRWR_,"root",SARH_ID,2,
		"tp","bool", "help",_("Only for databases that support such by means of specific data types like \"datetime\" in MySQL."));
	    ctrMkNode("fld",opt,-1,"/prm/add/keyTmCat",_("Unique and non duple messages for time and category only"),startStat()?R_R_R_:RWRWR_,"root",SARH_ID,2,
		"tp","bool", "help",_("Otherwise the message field is included to the primary key and is limited in 255 symbols."));
	}
	return;
    }

    //Process command to page
    string a_path = opt->attr("path");
    if(a_path == "/prm/st/tarch" && ctrChkNode(opt))	opt->setText(tm2s(1e-6*tmProc) + "[" + tm2s(1e-6*tmProcMax) + "]");
    else if(a_path == "/prm/add/sz") {
	if(ctrChkNode(opt,"get",RWRWR_,"root",SARH_ID,SEC_RD))	opt->setText(r2s(maxSize()));
	if(ctrChkNode(opt,"set",RWRWR_,"root",SARH_ID,SEC_WR))	setMaxSize(s2r(opt->text()));
    }
    else if(a_path == "/prm/add/tmAsStr") {
	if(ctrChkNode(opt,"get",RWRWR_,"root",SARH_ID,SEC_RD))	opt->setText(i2s(tmAsStr()));
	if(ctrChkNode(opt,"set",RWRWR_,"root",SARH_ID,SEC_WR))	setTmAsStr(s2i(opt->text()));
    }
    else if(a_path == "/prm/add/keyTmCat") {
	if(ctrChkNode(opt,"get",RWRWR_,"root",SARH_ID,SEC_RD))	opt->setText(i2s(keyTmCat()));
	if(ctrChkNode(opt,"set",RWRWR_,"root",SARH_ID,SEC_WR))	setKeyTmCat(s2i(opt->text()));
    }
    else TMArchivator::cntrCmdProc(opt);
}